Skip to content

Add log cache log streaming support#1338

Draft
ZPascal wants to merge 18 commits intocloudfoundry:mainfrom
ZPascal:add-log-cache-log-streaming-support
Draft

Add log cache log streaming support#1338
ZPascal wants to merge 18 commits intocloudfoundry:mainfrom
ZPascal:add-log-cache-log-streaming-support

Conversation

@ZPascal
Copy link

@ZPascal ZPascal commented Mar 3, 2026

Summary

This PR adds live log streaming via Log Cache to cf-java-client – the Java equivalent of
cf tail --follow and the Go [logcache.Walk()][go-walk] API.

Previously, the only Log Cache read path was logsRecent (a single snapshot GET /api/v1/read).
There was no way to continuously stream new log envelopes without polling manually.
This change introduces a first-class logsTail API across all three relevant modules
(cloudfoundry-client, cloudfoundry-client-reactor, cloudfoundry-operations).


Motivation

Cloud Foundry dropped the legacy Loggregator Doppler streaming endpoint
(DopplerClient.stream()) in Loggregator ≥ 107.0 (CFD ≥ 24.3 / TAS ≥ 4.0).
The Go CF CLI replaced it with a polling loop over the Log Cache
[/api/v1/read][log-cache-read] endpoint – that is what cf tail --follow does today.
This PR brings the same capability to Java consumers.


Changes

Changes in detail

cloudfoundry-client — new TailLogsRequest value object and LogCacheClient API

New file: _TailLogsRequest.java (Immutables @Value.Immutable)

Field Type Default Description
sourceId String App / service GUID (required)
startTime Long (nullable) now − 5 s (ns) Cursor start time in UNIX nanoseconds
envelopeTypes List<EnvelopeType> (nullable) all types Envelope type filter
nameFilter String (nullable) none Regex name filter (Log Cache ≥ 2.1.0)
pollInterval Duration 250 ms Back-off between polls when no new data

LogCacheClient interface — new method:

/**
 * Continuously polls Log Cache /api/v1/read and streams new Envelopes as they appear.
 * Equivalent to the Go logcache.Walk() API and `cf tail --follow`.
 * The Flux never completes on its own – cancel the subscription to stop streaming.
 */
Flux<Envelope> logsTail(TailLogsRequest request);

cloudfoundry-client-reactor — non-blocking polling implementation

ReactorLogCacheEndpoints.logsTail() implements the walk loop fully non-blocking
(no Thread.sleep). The algorithm mirrors the Go [logcache.Walk()][go-walk]:

  1. Cursor – an AtomicLong starts at startTime (or now − 5 s).
  2. PollFlux.defer builds a fresh ReadRequest from the current cursor on every
    repetition and calls GET /api/v1/read/{sourceId}?start_time=cursor.
  3. Emit – envelopes are sorted ascending by timestamp; the cursor advances to
    lastTimestamp + 1; each envelope is emitted individually downstream.
  4. Back-offrepeatWhen inspects the per-cycle item count: when it is 0
    (empty batch) a Mono.delay(pollInterval) is inserted before the next poll;
    When envelopes are received, the next poll starts immediately.
  5. Cancellation – the Flux is infinite; the caller cancels the subscription to stop.
Flux.defer(buildReadRequest)
    .onErrorReturn(ReadResponse.empty())
    .flatMapMany(sortAndAdvanceCursor)
    .repeatWhen(count == 0 → Mono.delay(pollInterval), else → immediate)

cloudfoundry-operationsApplications.logsTail()

DefaultApplications implements the new Applications interface method by delegating
to LogCacheClient:

@Override
public Flux<org.cloudfoundry.logcache.v1.Envelope> logsTail(TailLogsRequest request) {
    return this.logCacheClient
            .flatMapMany(client -> client.logsTail(request))
            .transform(OperationsLogging.log("Tail Application Logs"))
            .checkpoint();
}

Applications interface gains:

/**
 * Continuously streams application log envelopes from Log Cache.
 * The returned Flux is infinite – cancel it to stop streaming.
 * Java equivalent of `cf tail --follow`.
 */
Flux<Envelope> logsTail(TailLogsRequest request);

Tests

Four unit tests cover logsTail in DefaultApplicationsTest:

Test What it verifies
logsTailLogCache Happy path: a single LOG/OUT envelope is forwarded correctly
logsTailLogCacheMultipleEnvelopes Multiple envelopes: 3 envelopes with types OUT → ERR → OUT are all emitted in order
logsTailLogCacheError Error path: a RuntimeException from the client propagates unchanged to the subscriber
logsTailLogCacheOutAndErrEnvelopes OUT + ERR types: both stdout and stderr envelopes are forwarded without filtering
Tests run: 120, Failures: 0, Errors: 0, Skipped: 0

Usage Example

// Stream live logs for an application
TailLogsRequest request = TailLogsRequest.builder()
        .sourceId(applicationGuid)
        .envelopeTypes(List.of(EnvelopeType.LOG))   // optional: logs only
        .pollInterval(Duration.ofMillis(250))        // optional: default 250 ms
        .build();

logCacheClient.logsTail(request)
        .filter(e -> e.getLog() != null)
        .map(e -> e.getLog().getPayloadAsText())
        .subscribe(System.out::println);             // cancel() to stop

Or via the high-level Operations API:

cloudFoundryOperations.applications()
        .logsTail(TailLogsRequest.builder().sourceId(appGuid).build())
        .filter(e -> e.getLog() != null)
        .map(e -> e.getLog().getPayloadAsText())
        .subscribe(System.out::println);

Relation to existing API

Method Transport Completes? Use case
DopplerClient.stream() ⚠️ deprecated WebSocket / Doppler Yes (server closes) Legacy streaming (Loggregator < 107.0)
LogCacheClient.recentLogs() HTTP GET (single) Yes Fetch last N log lines
LogCacheClient.logsTail() ✅ new HTTP GET (polling loop) Never Live streaming (Loggregator ≥ 107.0)

Checklist

  • New _TailLogsRequest Immutables value object
  • LogCacheClient.logsTail() interface method
  • ReactorLogCacheEndpoints.logsTail() — fully non-blocking Reactor implementation
  • _ReactorLogCacheClient.logsTail() — delegate override
  • Applications.logsTail() — high-level Operations API method
  • DefaultApplications.logsTail() — implementation
  • Unit test logsTailLogCache in DefaultApplicationsTest
  • All 117 DefaultApplicationsTest tests pass
  • Execute the integration tests

Notes

  • go-walk
  • log-cache-read
  • Work was done with the assistance of Claude (claude-sonnet-4-6, Anthropic) via GitHub Copilot.

ZPascal and others added 18 commits February 10, 2026 22:57
* because of the findFirst() on the envelopes, it could be type OUT or ERR, so we don't really care, as long as the payload is the same.
Also, we don't care about the precise argument to the recentLogs call
The old "Applications.logs" method is keept for
compatibility and when a log stream is needed.

Adding new "logsRecent" method to access the logCache.

integration-tests "ApplicationTest.logs" and
"ApplicationTest.logsRecent" work.

Minor changes in DefaultApplicationsTest in other JUnit tests
to make them execute in Eclipse.
@ZPascal ZPascal force-pushed the add-log-cache-log-streaming-support branch from 606817f to 08fb796 Compare March 7, 2026 14:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants